package com.atuguigu.flink.Day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;

public class Example3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Example2.Event> loginStream = env
                .fromElements(
                        new Example2.Event("user1", "file", "local-1", 1000L),
                        new Example2.Event("user2", "file", "0.0.0.0", 2000L),
                        new Example2.Event("user2", "success", "0.0.0.0", 3000L),
                        new Example2.Event("user1", "file", "local-2", 4000L),
                        new Example2.Event("user1", "file", "local-3", 4500L))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Example2.Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Example2.Event>() {
                                    @Override
                                    public long extractTimestamp(Example2.Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        //定义需要的模板
        Pattern<Example2.Event, Example2.Event> pattern = Pattern
                .<Example2.Event>begin("file")
                .where(new SimpleCondition<Example2.Event>() {
                    @Override
                    public boolean filter(Example2.Event value) throws Exception {
                        return value.enentType.equals("file");
                    }
                })
                .times(3)
                .within(Time.seconds(5));



        //匹配出来的数据流
        PatternStream<Example2.Event> patternStream = CEP.pattern(loginStream.keyBy(r -> r.userId), pattern);


        //输出匹配到的数据流
        patternStream
                .select(new PatternSelectFunction<Example2.Event, String>() {
                    @Override
                    public String select(Map<String, List<Example2.Event>> map) throws Exception {
                        Example2.Event first = map.get("file").get(0);
                        Example2.Event second = map.get("file").get(1);
                        Example2.Event three = map.get("file").get(2);
                        return "用户" + first.userId + "在" + first.ipadress + ";" + second.ipadress + ";" + three.ipadress + ";" + "登录失败，可能是爬虫";
                    }
                })
                .print();




        env.execute();
    }

    public static class Event{
        public String userId;
        public String enentType;
        public String ipadress;
        public Long timestamp;

        public Event() {
        }

        public Event(String userId, String enentType, String ipadress, Long timestamp) {
            this.userId = userId;
            this.enentType = enentType;
            this.ipadress = ipadress;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "userId='" + userId + '\'' +
                    ", enentType='" + enentType + '\'' +
                    ", ipadress='" + ipadress + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}

